package com.sencorsta.ids.core.tcp.opensocket;


import com.sencorsta.ids.core.configure.TypeProtocol;
import com.sencorsta.ids.core.log.Out;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
　　* @description: 打开发件人
　　* @author TAO
　　* @date 2019/6/12 17:16
　　*/
public class OpenSender implements Runnable{
    private static final int __WARN_COUNT__ = 10000;

    protected final BlockingQueue<OpenMessage> __QUEUE__ = new LinkedBlockingQueue<>();

    protected String name;

    //protected volatile boolean disconnect = true;

    public String getName() {
        return name;
    }

    public OpenSender() {
        this.name = this.getClass().getSimpleName();
    }

    @Override
    public void run() {
        while (true) {
            try {
                OpenMessage msg = __QUEUE__.take();
                if (msg!=null) {
                    send(msg);
                }
            } catch (Exception e) {
                Out.error(e);
                e.printStackTrace();
            }
        }
    }

    /**
     * 添加消息到队里中， 并检查队列大小
     */
    public void addSend(OpenMessage message) {
        __QUEUE__.add(message);
        Out.trace("发送队列长度：" + __QUEUE__.size());
        if (size() > __WARN_COUNT__) {
            // 添加报警方法
            Out.warn(name + "发送队列太长: " + __QUEUE__.size());
        }
    }

    protected void send(OpenMessage message) throws InterruptedException {
        if (!message.isChannelClosed()) {
            OpenUser openUser = message.channel.attr(OpenServerBootstrap.getInstance().KEY_OPENUSER).get();
            if (openUser==null){
                Out.trace("OpenSender开始send,openUser为空:", "取消发送");
                return;
            }
            message.header.type = TypeProtocol.TYPE_RES;
            message.channel.writeAndFlush(message);
//            ByteBuf out = Unpooled.buffer(2048);
//            message.encode(out);
//
//            out.markReaderIndex();
//            int lenth=out.readableBytes();
//            int partLenth=1;
//            int parts=lenth/partLenth;
//            if (lenth%partLenth!=0){
//                parts+=1;
//            }
//
//            for (int i = 0; i < parts; i++) {
//                if (i+1==parts){
//                    int alldata=lenth-(i)*partLenth;
//                    Out.trace("最后一个包大小:",alldata);
//                    ByteBuf outPart=out.readBytes(alldata);
//
//                    //最后一个包粘一个完整包
//                    out.resetReaderIndex();
//                    message.channel.write(outPart.writeBytes(out.readBytes(out.readableBytes())));
//
//                    //message.channel.writeAndFlush(outPart);
//                    Out.trace("OpenSender开始send"+(i+1)+":" + outPart.readableBytes());
//
//
//                }else {
//                    ByteBuf outPart=out.readBytes(partLenth);
//                    message.channel.write(outPart);
//                    Out.trace("OpenSender开始send"+(i+1)+":" + outPart.readableBytes());
//                }
//                //Thread.sleep(300);
//            }
//            message.channel.flush();

//            int halfLenth=lenth/2;
//            ByteBuf outPart1=out.readBytes(halfLenth);
//            ByteBuf outPart2=out.readBytes(lenth-halfLenth);
//            message.channel.writeAndFlush(outPart1);
//            Out.trace("OpenSender开始send1:" + outPart1.readableBytes());
//            Thread.sleep(100);
//            message.channel.writeAndFlush(outPart2);
//            Out.trace("OpenSender开始send2:" + outPart2.readableBytes());

            Out.trace("OpenSender开始send:" + message);
            

//            short channelType=openUser.channelType;
//            if (channelType==0){
//                message.header.type = TypeProtocol.TYPE_RES;
//                message.channel.writeAndFlush(message);
//                Out.trace("OpenSender开始send:" + message);
//            }else if (channelType==1){
//                //message.channel.writeAndFlush(new TextWebSocketFrame(new String(message.data, GlobalConfigure.UTF_8)));
//                message.channel.writeAndFlush(new TextWebSocketFrame(message.toJSONString()));
//                Out.trace("OpenSender WS开始send:" + message);
//            }else {
//                Out.trace("OpenSender开始send,channelType未知:", "取消发送");
//            }
        } else {
            Out.trace("OpenSender开始send,channel为空:", "取消发送");
        }
        Out.trace("处理后消息队列大小："+size());
    }

    public int size() {
        return __QUEUE__.size();
    }
}
